package org.codehaus.activemq.service.impl;

import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.DestinationFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.service.MessageContainerManager;

public class InitialImageMessageContainerManager extends ProxyMessageContainerManager
{
  private Map cache;
  private boolean topic;
  private DestinationFilter destinationFilter;

  public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, DestinationFilter destinationFilter)
  {
    this(delegate, cache, true, destinationFilter);
  }

  public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, boolean topic, DestinationFilter destinationFilter) {
    super(delegate);
    this.cache = cache;
    this.topic = topic;
    this.destinationFilter = destinationFilter;
  }

  public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
    super.addMessageConsumer(client, info);

    ActiveMQDestination destination = info.getDestination();
    if (isValid(destination))
      if (destination.isWildcard()) {
        DestinationFilter filter = DestinationFilter.parseFilter(destination);
        sendMatchingInitialImages(client, info, filter);
      }
      else {
        ActiveMQMessage message = null;
        synchronized (this.cache) {
          message = (ActiveMQMessage)this.cache.get(destination);
        }
        if (message != null)
          sendMessage(client, message);
      }
  }

  public void sendMessage(BrokerClient client, ActiveMQMessage message)
    throws JMSException
  {
    ActiveMQDestination destination = message.getJMSActiveMQDestination();
    if (isValid(destination)) {
      this.cache.put(destination, message);
    }
    super.sendMessage(client, message);
  }

  protected void sendMatchingInitialImages(BrokerClient client, ConsumerInfo info, DestinationFilter filter)
    throws JMSException
  {
    Iterator iter;
    synchronized (this.cache) {
      for (iter = this.cache.entrySet().iterator(); iter.hasNext(); ) {
        Map.Entry entry = (Map.Entry)iter.next();
        Destination destination = (Destination)entry.getKey();
        if (filter.matches(destination)) {
          ActiveMQMessage message = (ActiveMQMessage)entry.getValue();
          sendMessage(client, message);
        }
      }
    }
  }

  protected boolean isValid(ActiveMQDestination destination)
  {
    return (destination.isTopic() == this.topic) && ((this.destinationFilter == null) || (this.destinationFilter.matches(destination)));
  }
}